In [1]:
sc.version
Out[1]:
The word count program is considered to be the "hello world" program in Big Data analytics. In the word count program, given a text file, we want to count how many times every single word occurs. An example follows.
Input file:
foo bar bar
baz bar
foo baz bar
Result:
(foo,2)
(bar,4)
(baz,2)
Task 1: given an input file data/la_divina_commedia.txt, count how many times each single word occurs into it.
In [19]:
{
sc.textFile("data/la_divina_commedia.txt")
.flatMap(_.split(" ")) // transformation
.map((_,1)) // transformation
.reduceByKey(_+_) // transformation
.saveAsTextFile("data/commedia_counts.txt") // action (triggers the computation)
}
Large dataset analysis is the main use case of Spark. However, Spark can be used to perform compute intensive tasks as well. Montecarlo $\pi$ estimation is a good example problem.
Idea: the ratio $\frac{A_{circle}}{A_{square}}$ is roughly equal to the faction of darts that fall in the circle.
In [18]:
{
val n = 10000000
val count = sc.parallelize(1 to n)
.map { _ =>
val x = math.random
val y = math.random
if(x*x + y*y < 1) 1 else 0
}.reduce(_+_)
val pi = 4.0 * count / n
println(pi)
}
When Spark was first implemented, the motivation to have a new framework was the lack of dataset caching in MapReduce (and Hadoop). This is penalizing for applications that need to access a hot dataset iteratively. Building a K-Nearest Neighbour (KNN) classifier, is a nice example that falls in this range of problems.
Supervised classification: Given a dataset of examples $(x_i, l)$, $i=1 ... N$, where $x_n$ is a vector in a fixed dimensional space and $l$ is a class label, in supervised classification we aim to build a model to classify new unlabelled exaples $(x_i, ?)$, $i=N+1 ... M$.
KNN, idea: Foreach new example $(x_i, ?)$, $i=N+1 ... M$, compute the (euclidean) distance from the known examples $(x_i, l)$, $i=1 ... N$. Label $(x_i, ?)$ as the most frequent class in the K nearest neighbours.
In [17]:
{
// Generate some examples
val examples = sc.parallelize(1 to 300).map { _ =>
val xi = Array(math.random, math.random)
val label = if(xi(1) > xi(0)) {
"yellow"
} else {
"purple"
}
(xi,label)
}
// Save 20% of the exaples for testing
val split = examples.randomSplit(Array(0.8,0.2))
val dataset = split(0).cache // cache in memory
val testset = split(1).collect // assume M small
// Make some prediction with 1NN
val predictions = testset.map { case(x,_) => // unseen
dataset.map { case(z, label) => // known label
val d0 = z(0) - x(0)
val d1 = z(1) - x(1)
val dist = math.sqrt(d0*d0 + d1*d1)
(dist,label)
}
.sortBy{case(dist,label) => dist}
.first
._2 // return the label
}
// Evaluate our predictions
val correct = testset.zip(predictions).count {
case((_,label),prediction) =>
label == prediction
}
val correctFrac = correct.toDouble / testset.length
println(s"fraction of correct predictions $correctFrac")
}
Please open an issue here: https://goo.gl/dOy089.